Add S3Agent to read and write files to S3 buckets

Dominik Sander 8 年之前
父節點
當前提交
40835e981f
共有 4 個文件被更改,包括 433 次插入0 次删除
  1. 3 0
      Gemfile
  2. 4 0
      Gemfile.lock
  3. 206 0
      app/models/agents/s3_agent.rb
  4. 220 0
      spec/models/agents/s3_agent_spec.rb

+ 3 - 0
Gemfile

@@ -66,6 +66,9 @@ gem 'evernote_oauth'
66 66
 # LocalFileAgent (watch functionality)
67 67
 gem 'listen', '~> 3.0.5', require: false
68 68
 
69
+# S3Agent
70
+gem 'aws-sdk-core', '~> 2.2.15'
71
+
69 72
 # Optional Services.
70 73
 gem 'omniauth-37signals'          # BasecampAgent
71 74
 gem 'omniauth-wunderlist', github: 'wunderlist/omniauth-wunderlist', ref: 'd0910d0396107b9302aa1bc50e74bb140990ccb8'

+ 4 - 0
Gemfile.lock

@@ -110,6 +110,8 @@ GEM
110 110
       addressable (>= 2.3.1)
111 111
       extlib (>= 0.9.15)
112 112
       multi_json (>= 1.0.0)
113
+    aws-sdk-core (2.2.15)
114
+      jmespath (~> 1.0)
113 115
     bcrypt (3.1.10)
114 116
     better_errors (1.1.0)
115 117
       coderay (>= 1.0.0)
@@ -279,6 +281,7 @@ GEM
279 281
     hypdf (1.0.7)
280 282
       httmultiparty (= 0.3.10)
281 283
     i18n (0.7.0)
284
+    jmespath (1.1.3)
282 285
     jquery-rails (3.1.3)
283 286
       railties (>= 3.0, < 5.0)
284 287
       thor (>= 0.14, < 2.0)
@@ -577,6 +580,7 @@ PLATFORMS
577 580
 
578 581
 DEPENDENCIES
579 582
   ace-rails-ap (~> 2.0.1)
583
+  aws-sdk-core (~> 2.2.15)
580 584
   better_errors (~> 1.1)
581 585
   binding_of_caller
582 586
   bootstrap-kaminari-views (~> 0.0.3)

+ 206 - 0
app/models/agents/s3_agent.rb

@@ -0,0 +1,206 @@
1
+module Agents
2
+  class S3Agent < Agent
3
+    include FormConfigurable
4
+    include FileHandling
5
+
6
+    emits_file_pointer!
7
+    no_bulk_receive!
8
+
9
+    default_schedule 'every_1h'
10
+
11
+    gem_dependency_check { defined?(Aws::S3) }
12
+
13
+    description do
14
+      <<-MD
15
+        The S3Agent can watch a bucket for changes or emit an event for every file in that bucket. When receiving events, it writes the data into a file on S3.
16
+
17
+        #{'## Include `aws-sdk-core` in your Gemfile to use this Agent!' if dependencies_missing?}
18
+
19
+        `mode` must be present and either `read` or `write`, in `read` mode the agent checks the S3 bucket for changed files, with `write` it writes received events to a file in the bucket.
20
+
21
+        ### Universal options
22
+
23
+        To use credentials for the `access_key` and `access_key_secret` use the liquid `credential` tag like so `{% credential name-of-credential %}`
24
+
25
+        Select the `region` in which the bucket was created.
26
+
27
+        ### Reading
28
+
29
+        When `watch` is set to `true` the S3Agent will watch the specified `bucket` for changes. An event will be emitted for every detected change.
30
+
31
+        When `watch` is set to `false` the agent will emit an event for every file in the bucket on each sheduled run.
32
+
33
+        #{emitting_file_handling_agent_description}
34
+
35
+        ### Writing
36
+
37
+        Specify the filename to use in `filename`, Liquid interpolation is possible to change the name per event.
38
+
39
+        Use [Liquid](https://github.com/cantino/huginn/wiki/Formatting-Events-using-Liquid) templating in `data` to specify which part of the received event should be written.
40
+      MD
41
+    end
42
+
43
+    event_description do
44
+      "Events will looks like this:\n\n    %s" % if boolify(interpolated['watch'])
45
+        Utils.pretty_print({
46
+          "file_pointer" => {
47
+            "file" => "filename",
48
+            "agent_id" => id
49
+          },
50
+          "event_type" => "modified/added/removed"
51
+        })
52
+      else
53
+        Utils.pretty_print({
54
+          "file_pointer" => {
55
+            "file" => "filename",
56
+            "agent_id" => id
57
+          }
58
+        })
59
+      end
60
+    end
61
+
62
+    def default_options
63
+      {
64
+        'mode' => 'read',
65
+        'access_key_id' => '',
66
+        'access_key_secret' => '',
67
+        'watch' => 'true',
68
+        'bucket' => "",
69
+        'data' => '{{ data }}'
70
+      }
71
+    end
72
+
73
+    form_configurable :mode, type: :array, values: %w(read write)
74
+    form_configurable :access_key_id, roles: :validatable
75
+    form_configurable :access_key_secret, roles: :validatable
76
+    form_configurable :region, type: :array, values: %w(us-east-1 us-west-1 us-west-2 eu-west-1 eu-central-1 ap-southeast-1 ap-southeast-2 ap-northeast-1 ap-northeast-2 sa-east-1)
77
+    form_configurable :watch, type: :array, values: %w(true false)
78
+    form_configurable :bucket, roles: :completable
79
+    form_configurable :filename
80
+    form_configurable :data
81
+
82
+    def validate_options
83
+      if options['mode'].blank? || !['read', 'write'].include?(options['mode'])
84
+        errors.add(:base, "The 'mode' option is required and must be set to 'read' or 'write'")
85
+      end
86
+      if options['bucket'].blank?
87
+        errors.add(:base, "The 'bucket' option is required.")
88
+      end
89
+      if options['region'].blank?
90
+        errors.add(:base, "The 'region' option is required.")
91
+      end
92
+
93
+      case interpolated['mode']
94
+      when 'read'
95
+        if options['watch'].blank? || ![true, false].include?(boolify(options['watch']))
96
+          errors.add(:base, "The 'watch' option is required and must be set to 'true' or 'false'")
97
+        end
98
+      when 'write'
99
+        if options['filename'].blank?
100
+          errors.add(:base, "filename must be specified in 'write' mode")
101
+        end
102
+        if options['data'].blank?
103
+          errors.add(:base, "data must be specified in 'write' mode")
104
+        end
105
+      end
106
+    end
107
+
108
+    def validate_access_key_id
109
+      !!buckets
110
+    end
111
+
112
+    def validate_access_key_secret
113
+      !!buckets
114
+    end
115
+
116
+    def complete_bucket
117
+      (buckets || []).collect { |room| {text: room.name, id: room.name} }
118
+    end
119
+
120
+    def working?
121
+      checked_without_error?
122
+    end
123
+
124
+    def check
125
+      return if interpolated['mode'] != 'read'
126
+      contents = safely do
127
+                   get_bucket_contents
128
+                 end
129
+      if boolify(interpolated['watch'])
130
+        watch(contents)
131
+      else
132
+        contents.each do |key, _|
133
+          create_event payload: get_file_pointer(key)
134
+        end
135
+      end
136
+    end
137
+
138
+    def get_io(file)
139
+      client.get_object(bucket: interpolated['bucket'], key: file).body
140
+    end
141
+
142
+    def receive(incoming_events)
143
+      return if interpolated['mode'] != 'write'
144
+      incoming_events.each do |event|
145
+        safely do
146
+          mo = interpolated(event)
147
+          client.put_object(bucket: mo['bucket'], key: mo['filename'], body: mo['data'])
148
+        end
149
+      end
150
+    end
151
+
152
+    private
153
+
154
+    def safely
155
+      yield
156
+    rescue Aws::S3::Errors::AccessDenied => e
157
+      error("Could not access '#{interpolated['bucket']}' #{e.class} #{e.message}")
158
+    rescue Aws::S3::Errors::ServiceError =>e
159
+      error("#{e.class}: #{e.message}")
160
+    end
161
+
162
+    def watch(contents)
163
+      if last_check_at.nil?
164
+        self.memory['seen_contents'] = contents
165
+        return
166
+      end
167
+
168
+      new_memory = contents.dup
169
+
170
+      memory['seen_contents'].each do |key, etag|
171
+        if contents[key].blank?
172
+          create_event payload: get_file_pointer(key).merge(event_type: :removed)
173
+        elsif contents[key] != etag
174
+          create_event payload: get_file_pointer(key).merge(event_type: :modified)
175
+        end
176
+        contents.delete(key)
177
+      end
178
+      contents.each do |key, etag|
179
+        create_event payload: get_file_pointer(key).merge(event_type: :added)
180
+      end
181
+
182
+      self.memory['seen_contents'] = new_memory
183
+    end
184
+
185
+    def get_bucket_contents
186
+      contents = {}
187
+      client.list_objects(bucket: interpolated['bucket']).each do |response|
188
+        response.contents.each do |file|
189
+          contents[file.key] = file.etag
190
+        end
191
+      end
192
+      contents
193
+    end
194
+
195
+    def client
196
+      @client ||= Aws::S3::Client.new(credentials: Aws::Credentials.new(interpolated['access_key_id'], interpolated['access_key_secret']),
197
+                                      region: interpolated['region'])
198
+    end
199
+
200
+    def buckets(log = false)
201
+      @buckets ||= client.list_buckets.buckets
202
+    rescue Aws::S3::Errors::ServiceError => e
203
+      false
204
+    end
205
+  end
206
+end

+ 220 - 0
spec/models/agents/s3_agent_spec.rb

@@ -0,0 +1,220 @@
1
+require 'rails_helper'
2
+
3
+describe Agents::S3Agent do
4
+  before(:each) do
5
+    @valid_params = {
6
+                      'mode' => 'read',
7
+                      'access_key_id' => '32343242',
8
+                      'access_key_secret' => '1231312',
9
+                      'watch' => 'false',
10
+                      'bucket' => 'testbucket',
11
+                      'region' => 'us-east-1',
12
+                      'filename' => 'test.txt',
13
+                      'data' => '{{ data }}'
14
+                    }
15
+
16
+    @checker = Agents::S3Agent.new(:name => "somename", :options => @valid_params)
17
+    @checker.user = users(:jane)
18
+    @checker.save!
19
+  end
20
+
21
+  describe "#validate_options" do
22
+    it "requires the bucket to be set" do
23
+      @checker.options['bucket'] = ''
24
+      expect(@checker).not_to be_valid
25
+    end
26
+
27
+    it "requires watch to be present" do
28
+      @checker.options['watch'] = ''
29
+      expect(@checker).not_to be_valid
30
+    end
31
+
32
+    it "requires watch to be either 'true' or 'false'" do
33
+      @checker.options['watch'] = 'true'
34
+      expect(@checker).to be_valid
35
+      @checker.options['watch'] = 'false'
36
+      expect(@checker).to be_valid
37
+      @checker.options['watch'] = 'test'
38
+      expect(@checker).not_to be_valid
39
+    end
40
+
41
+    it "requires region to be present" do
42
+      @checker.options['region'] = ''
43
+      expect(@checker).not_to be_valid
44
+    end
45
+
46
+    it "requires mode to be set to 'read' or 'write'" do
47
+      @checker.options['mode'] = 'write'
48
+      expect(@checker).to be_valid
49
+      @checker.options['mode'] = ''
50
+      expect(@checker).not_to be_valid
51
+    end
52
+
53
+    it "requires 'filename' in 'write' mode" do
54
+      @checker.options['mode'] = 'write'
55
+      @checker.options['filename'] = ''
56
+      expect(@checker).not_to be_valid
57
+    end
58
+
59
+    it "requires 'data' in 'write' mode" do
60
+      @checker.options['mode'] = 'write'
61
+      @checker.options['data'] = ''
62
+      expect(@checker).not_to be_valid
63
+    end
64
+  end
65
+
66
+  describe "#validating" do
67
+    it "validates the key" do
68
+      mock(@checker).client { raise Aws::S3::Errors::SignatureDoesNotMatch.new('', '') }
69
+      expect(@checker.validate_access_key_id).to be_falsy
70
+    end
71
+
72
+    it "validates the secret" do
73
+      mock(@checker).buckets { true }
74
+      expect(@checker.validate_access_key_secret).to be_truthy
75
+    end
76
+  end
77
+
78
+  it "completes the buckets" do
79
+    mock(@checker).buckets { [OpenStruct.new(name: 'test'), OpenStruct.new(name: 'test2')]}
80
+    expect(@checker.complete_bucket).to eq([{text: 'test', id: 'test'}, {text: 'test2', id: 'test2'}])
81
+  end
82
+
83
+  context "#working" do
84
+    it "is working with no recent errors" do
85
+      @checker.last_check_at = Time.now
86
+      expect(@checker).to be_working
87
+    end
88
+  end
89
+
90
+  context "#check" do
91
+    context "not watching" do
92
+      it "emits an event for every file" do
93
+        mock(@checker).get_bucket_contents { {"test"=>"231232", "test2"=>"4564545"} }
94
+        expect { @checker.check }.to change(Event, :count).by(2)
95
+        expect(Event.last.payload).to eq({"file_pointer" => {"file"=>"test2", "agent_id"=> @checker.id}})
96
+      end
97
+    end
98
+
99
+    context "watching" do
100
+      before(:each) do
101
+        @checker.options['watch'] = 'true'
102
+      end
103
+
104
+      it "does not emit any events on the first run" do
105
+        contents = {"test"=>"231232", "test2"=>"4564545"}
106
+        mock(@checker).get_bucket_contents { contents }
107
+        expect { @checker.check }.not_to change(Event, :count)
108
+        expect(@checker.memory).to eq('seen_contents' => contents)
109
+      end
110
+
111
+      context "detecting changes" do
112
+        before(:each) do
113
+          contents = {"test"=>"231232", "test2"=>"4564545"}
114
+          mock(@checker).get_bucket_contents { contents }
115
+          expect { @checker.check }.not_to change(Event, :count)
116
+          @checker.last_check_at = Time.now
117
+        end
118
+
119
+        it "emits events for removed files" do
120
+          contents = {"test"=>"231232"}
121
+          mock(@checker).get_bucket_contents { contents }
122
+          expect { @checker.check }.to change(Event, :count).by(1)
123
+          expect(Event.last.payload).to eq({"file_pointer" => {"file" => "test2", "agent_id"=> @checker.id}, "event_type" => "removed"})
124
+        end
125
+
126
+        it "emits events for modified files" do
127
+          contents = {"test"=>"231232", "test2"=>"changed"}
128
+          mock(@checker).get_bucket_contents { contents }
129
+          expect { @checker.check }.to change(Event, :count).by(1)
130
+          expect(Event.last.payload).to eq({"file_pointer" => {"file" => "test2", "agent_id"=> @checker.id}, "event_type" => "modified"})
131
+        end
132
+        it "emits events for added files" do
133
+          contents = {"test"=>"231232", "test2"=>"4564545", "test3" => "31231231"}
134
+          mock(@checker).get_bucket_contents { contents }
135
+          expect { @checker.check }.to change(Event, :count).by(1)
136
+          expect(Event.last.payload).to eq({"file_pointer" => {"file" => "test3", "agent_id"=> @checker.id}, "event_type" => "added"})
137
+        end
138
+      end
139
+
140
+      context "error handling" do
141
+        it "handles AccessDenied exceptions" do
142
+          mock(@checker).get_bucket_contents { raise Aws::S3::Errors::AccessDenied.new('', '') }
143
+          expect { @checker.check }.to change(AgentLog, :count).by(1)
144
+          expect(AgentLog.last.message).to eq("Could not access 'testbucket' Aws::S3::Errors::AccessDenied ")
145
+        end
146
+
147
+        it "handles generic S3 exceptions" do
148
+          mock(@checker).get_bucket_contents { raise Aws::S3::Errors::PermanentRedirect.new('', 'error') }
149
+          expect { @checker.check }.to change(AgentLog, :count).by(1)
150
+          expect(AgentLog.last.message).to eq("Aws::S3::Errors::PermanentRedirect: error")
151
+        end
152
+      end
153
+    end
154
+  end
155
+
156
+  it "get_io returns a StringIO object" do
157
+    stringio =StringIO.new
158
+    mock_response = mock()
159
+    mock(mock_response).body { stringio }
160
+    mock_client = mock()
161
+    mock(mock_client).get_object(bucket: 'testbucket', key: 'testfile') { mock_response }
162
+    mock(@checker).client { mock_client }
163
+    @checker.get_io('testfile')
164
+  end
165
+
166
+  context "#get_bucket_contents" do
167
+    it "returns a hash with the contents of the bucket" do
168
+      mock_response = mock()
169
+      mock(mock_response).contents { [OpenStruct.new(key: 'test', etag: '231232'), OpenStruct.new(key: 'test2', etag: '4564545')] }
170
+      mock_client = mock()
171
+      mock(mock_client).list_objects(bucket: 'testbucket') { [mock_response] }
172
+      mock(@checker).client { mock_client }
173
+      expect(@checker.send(:get_bucket_contents)).to eq({"test"=>"231232", "test2"=>"4564545"})
174
+    end
175
+  end
176
+
177
+  context "#client" do
178
+    it "initializes the S3 client correctly" do
179
+      mock_credential = mock()
180
+      mock(Aws::Credentials).new('32343242', '1231312') { mock_credential }
181
+      mock(Aws::S3::Client).new(credentials: mock_credential,
182
+                                      region: 'us-east-1')
183
+      @checker.send(:client)
184
+    end
185
+  end
186
+
187
+  context "#event_description" do
188
+    it "should include event_type when watch is set to true" do
189
+      @checker.options['watch'] = 'true'
190
+      expect(@checker.event_description).to include('event_type')
191
+    end
192
+
193
+    it "should not include event_type when watch is set to false" do
194
+      @checker.options['watch'] = 'false'
195
+      expect(@checker.event_description).not_to include('event_type')
196
+    end
197
+  end
198
+
199
+  context "#receive" do
200
+    before(:each) do
201
+      @checker.options['mode'] = 'write'
202
+      @checker.options['filename'] = 'file.txt'
203
+      @checker.options['data'] = '{{ data }}'
204
+    end
205
+
206
+    it "writes the data at data into a file" do
207
+      client_mock = mock()
208
+      mock(client_mock).put_object(bucket: @checker.options['bucket'], key: @checker.options['filename'], body: 'hello world!')
209
+      mock(@checker).client { client_mock }
210
+      event = Event.new(payload: {'data' => 'hello world!'})
211
+      @checker.receive([event])
212
+    end
213
+
214
+    it "does nothing when mode is set to 'read'" do
215
+      @checker.options['mode'] = 'read'
216
+      event = Event.new(payload: {'data' => 'hello world!'})
217
+      @checker.receive([event])
218
+    end
219
+  end
220
+end